[AWS Step Functions]Mapステート内でエラーをキャッチしてステートマシンを失敗させる(AWS CDK)

[AWS Step Functions]Mapステート内でエラーをキャッチしてステートマシンを失敗させる(AWS CDK)

Mapステート内でエラーをキャッチする実装にした場合でも、ステートマシンがエラーになるように実装してみた。
Clock Icon2022.09.29

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

具体的なユースケース

Step FunctionsのMapステートを利用している方で、以下2つのケースを満たしたい方向けです。

  • Mapステートを利用していて、エラー時に並列処理を中断させたくない
  • Mapステート内で1つでもエラーがある場合、ステートマシンを失敗させたい

前置き

Mapステート内でエラーをキャッチした上で、ステートマシンを失敗させるなんてなんでややこしいことするの?と思うかもしれませんが、まずはMapステートの動作を理解する必要があります。

Mapステートのデフォルト動作

まずエラー時にMapステートのデフォルト動作を確認してみます。サンプルとして全リージョンにMapステートを並列実行するものを利用しています。

Map内の並列処理で1つでも失敗した場合、Mapステートが失敗となりステートマシン自体も失敗となりました。

このとき他の並列処理はキャンセル扱いとなってしまうため、最後まで処理されずにキャンセルされます。今回は1つのリージョンで処理が失敗したため、処理途中の13リージョンが途中でキャンセルされています。

このようなケースでは、失敗した1リージョンに原因があるのかわかりません、今回キャンセルされたリージョンが次の実行時にエラーとなる可能性もあるため、全リージョン実行して結果を確認したくなります。

並列処理を停止させない実装

そこで、並列処理を停止させずに完了させる方法を調べると、Map内でエラーをキャッチすることでMapステート全体が失敗することを防ぐことができるようです。

上記を参考にエラーをキャッチするステップをMap内に追加してみました。CDKのコードは以下ブログを参考にしてください。

これで実行してみるとエラーキャッチのステップ自体は成功となるため、想定通りMapステートが失敗することなく並列処理が実行されます。

これでMapステートが途中で止まることなく、全リージョンの処理を成功させることができました。

しかし、この場合はエラーがMap内でキャッチされてしまうためステートマシン自体が成功扱いとなります。(Mapステートが成功となるため)

Map処理を中断させないことは達成できましたが、このままではエラーに気づけない恐れが…。

そこで解決策の1つとして、Mapステート外でエラーを判定してステートマシンを失敗させる方法を考えました。

並列処理を失敗させずにステートマシンを失敗させる

前置きが長くなりましたが、ここからが本エントリの本題です。

考えた結果、以下のような形で実装してみました。Mapステートの後にMapステートの出力編集(result)とエラー判定(choice)し、成功(success)と失敗(failed)のステップへ分岐するようにしています。

ステートマシン

CDKサンプル

実装はCDK(Typescript)で行なっています。


import { Construct } from 'constructs';
import {
  Stack,
  StackProps,
  Duration,
  aws_iam as iam,
  aws_lambda as lambda,
  aws_stepfunctions as sfn,
  aws_stepfunctions_tasks as tasks,
} from 'aws-cdk-lib';

export class AwsCdkMultiRegionSfnStack extends Stack {

  constructor(scope: Construct, id: string, props?: StackProps) {
    super(scope, id, props);
    //IAMロール
    const lambdaRole = new iam.Role(this, "lambdaRole", {
      assumedBy: new iam.ServicePrincipal("lambda.amazonaws.com"),
    });
    lambdaRole.addManagedPolicy(
      iam.ManagedPolicy.fromAwsManagedPolicyName("service-role/AWSLambdaBasicExecutionRole")
    );
    lambdaRole.addManagedPolicy(
      iam.ManagedPolicy.fromAwsManagedPolicyName("AmazonEC2ReadOnlyAccess")
    );
    //リージョン一覧を取得するLambda
    const createGetRegionLambdaFunction = new lambda.Function(this, 'createGetRegionLambdaFunction', {
      code: new lambda.AssetCode("lambda"),
      runtime: lambda.Runtime.PYTHON_3_9,
      functionName: "GetRegionLambdaFunction",
      handler: 'get_regions.handler',
      role: lambdaRole,
      timeout: Duration.seconds(180),
    });
    //リージョンを出力するLambda
    const createPrintRegionLambdaFunction = new lambda.Function(this, 'createPrintRegionLambdaFunction', {
      code: new lambda.AssetCode("lambda"),
      runtime: lambda.Runtime.PYTHON_3_9,
      functionName: "PrintRegionLambdaFunction",
      handler: 'print_region.handler',
      role: lambdaRole,
      timeout: Duration.seconds(180),
    });
    const getRegionsTask = new tasks.LambdaInvoke(this, 'getRegionsTask', {
      lambdaFunction: createGetRegionLambdaFunction,
      resultPath: '$.getRegions',
    });

    const printRegionTask = new tasks.LambdaInvoke(this, 'printRegionTask', {
      lambdaFunction: createPrintRegionLambdaFunction,
      resultSelector: {
        "Region.$": "$.Payload",
      },
    });

    //Mapステート
    const printRegionMap = new sfn.Map(this, 'printRegionMap', {
      inputPath: "$.getRegions.Payload.Regions",
      resultPath: "$.printRegionMap"
    });
    //MAPのイテレーターにRegion出力タスクを指定
    printRegionMap.iterator(printRegionTask)

    //MAP内エラー取得
    const printRegionTaskError = new sfn.Pass(this, 'printRegionTaskError')
    printRegionTask.addCatch(printRegionTaskError, { resultPath: '$.Error' })

    // result(結果の文字列を結合)
    const result = new sfn.Pass(this, 'result', {
      parameters: {
        "jsonToString.$": "States.JsonToString($.printRegionMap)"
      },
      resultPath: "$.result"
    });
    // 失敗
    const failed = new sfn.Fail(this, 'failed');
    // 成功
    const success = new sfn.Succeed(this, 'success');
    // エラー判定
    const check = new sfn.Choice(this, 'check');
    check.when(
      sfn.Condition.stringMatches(sfn.JsonPath.stringAt("$.result.jsonToString"), "*Error*"),
      failed
    )
    check.otherwise(success)

    //ステートマシン
    const definition = getRegionsTask.next(printRegionMap).next(result).next(check);
    new sfn.StateMachine(this, 'RegionMapStateMachine', {
      stateMachineName: "RegionMapStateMachine",
      definition: definition,
    });

  }
}

Lambda(Python)

Lambdaはリージョン取得と、出力用で2つを用意しました。

リージョン取得用(getRegionsTask)

リージョンを取得したあと、リストの形式だとステートマシン内で扱いにくいため、オブジェクト(辞書型)の形式にしてreturnしています。

import boto3
def handler(event, context):
    ec2 = boto3.client('ec2')
    regions = list(map(lambda x: x['RegionName'], ec2.describe_regions()['Regions']))
    ret = []
    for region in regions:
        ret.append(
            {
                "Region": region,
            }
        )
    return {"Regions": ret}

リージョン出力用(printRegionTask)

こちらはインプットとしてRegionを取得してprintだけしています。今回はMap内で1つだけエラーとしたいため、東京リージョンのみエラーになるようにしています。

def handler(event, context):
    region = event.get("Region")
    print(region)
		if region == "ap-northeast-1":
		      raise
    return region

これらをCDKでデプロイしています。

やってみる

CDKでデプロイできたら、実際にエラーがうまく判定できるか試してみます。インプットは不要なので、そのまま実行してみるとするとステートマシンは想定通り失敗となりました。

Mapステートは成功していますが、後続のchoiceで failed に分岐しています。これは、Mapステートで並列実行された中でエラーとなったことを判定されたためです。複数リージョン失敗した場合でも、同じようにfailedに遷移します。

ここではどのようにエラー判定が行われているか分かりにくいので、実際の入出力を追ってみます。(全て記載すると、不要な情報も多いため解説に不要な部分は省略しています。)

入出力の確認

Mapステートの出力は各リージョンごとの結果が含まれるため、配列の形で出力されます。以下の場合、東京リージョンのみがエラーとなっているケースです。

Mapステート出力

"printRegionMap": [
    {
      "Region": "eu-north-1"
    },
    {
      "Region": "ap-northeast-1",
      "Error": {
        "Error": "RuntimeError",
        "Cause": "{\"errorMessage\":\"No active exception to reraise\",\"errorType\":\"RuntimeError\",\"requestId\":\"369e40bb-deb8-429d-af0d-058307aed690\",\"stackTrace\":[\"  File \\\"/var/task/print_region.py\\\", line 6, in handler\\n    raise\\n\"]}"
      }
    },
    {
      "Region": "sa-east-1"
    }
~~略~~
  ]

このままではエラーの判定ができないため、Mapステートの入力を次の result ステップで文字列を連結します。

result出力

"result": {
    "jsonToString": "[{\"Region\":\"eu-north-1\"},{\"Region\":\"ap-northeast-1\",\"Error\":{\"Error\":\"RuntimeError\",\"Cause\":\"{\\\"errorMessage\\\":\\\"No active exception to reraise\\\",\\\"errorType\\\":\\\"RuntimeError\\\",\\\"requestId\\\":\\\"369e40bb-deb8-429d-af0d-058307aed690\\\",\\\"stackTrace\\\":[\\\"  File \\\\\\\"/var/task/print_region.py\\\\\\\", line 6, in handler\\\\n    raise\\\\n\\\"]}\"}},{\"Region\":\"sa-east-1\"}]"
  }

CDK実装では以下の部分です。Mapステートの出力($.printRegionMap)を結合した結果をパラメータjsonToStringとして設定。出力先を$.resultとしています。

    // result(結果の文字列を結合)
    const result = new sfn.Pass(this, 'result', {
      parameters: {
        "jsonToString.$": "States.JsonToString($.printRegionMap)"
      },
      resultPath: "$.result"
    });

これでMapの出力が文字列として扱えるようになりました。

次に Choice のステップではresultの結果にErrorの文字列が含まれているかを判定。含まれている場合はfailedへ遷移するようにしています。

CDKでは以下の部分です。

    // エラー判定
    const check = new sfn.Choice(this, 'check');
    check.when(
      sfn.Condition.stringMatches(sfn.JsonPath.stringAt("$.result.jsonToString"), "*Error*"),
      failed
    )
    check.otherwise(success)

一応成功するパターンも試しましたが、問題なくsuccessに遷移しました。

おわりに

これで無事にMapステートを失敗させずに、ステートマシンを失敗させることができました。これに合わせてステートマシンの失敗を通知する仕組みがあればエラーに気づくことができそうですね。

一応Mapステート内でエラー時に通知するステップを追加する方法も考えたのですが、全ての並列実行がエラーとなった通知量がすごいことになりそうなのでやめました。

ステートマシンの実行完了を通知する仕組みは、以下を参考にすると幸せになれるかもしれません。

AWS Step Functionsの実行完了をAWS Chatbotを使っていい感じにSlackに通知する | DevelopersIO

Step FunctionsでMapを実装している方の助けになれば幸いです。

参考

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.